* Added option to fsync the buffer on a configurable interval.
- * Use posix_fadvise() on the buffer file. (@Svedrin)
\ No newline at end of file
+ * Use posix_fadvise() on the buffer file. (@Svedrin)
+
+ * Refactor buffer and cleanup alternative buffer path.
\ No newline at end of file
long int procinfo_total_virtual_memory(void);
/* Total Open Files */
-long int procinfo_open_files(const char * path);
+long int procinfo_open_files(const char * path, int include_fd);
#endif /* PROCINFO_H_ */
#ifndef SIRIDB_BUFFER_H_
#define SIRIDB_BUFFER_H_
+typedef struct siridb_buffer_s siridb_buffer_t;
+
#include <siri/db/db.h>
#include <siri/db/series.h>
#include <siri/db/points.h>
#define MAX_BUFFER_SZ 10485760
+
+siridb_buffer_t * siridb_buffer_new(void);
+void siridb_buffer_free(siridb_buffer_t * buffer);
int siridb_buffer_new_series(
- siridb_t * siridb,
+ siridb_buffer_t * buffer,
siridb_series_t * series);
-int siridb_buffer_open(siridb_t * siridb);
+int siridb_buffer_open(siridb_buffer_t * buffer);
int siridb_buffer_load(siridb_t * siridb);
-void siridb_buffer_free(siridb_t * siridb);
int siridb_buffer_write_empty(
- siridb_t * siridb,
+ siridb_buffer_t * buffer,
siridb_series_t * series);
int siridb_buffer_write_last_point(
- siridb_t * siridb,
+ siridb_buffer_t * buffer,
siridb_series_t * series);
-int siridb_buffer_fsync(siridb_t * siridb);
+struct siridb_buffer_s
+{
+ size_t size; /* size for one series inside the buffer */
+ size_t nsize; /* optional new size from database.conf */
+ size_t len; /* number of points allocated per series */
+ char * template; /* template for writing an empty buffer */
+ char * path; /* path where the buffer file is stored */
+ slist_t * empty; /* list with empty buffer spaces */
+ FILE * fp; /* buffer file pointer */
+ int fd; /* buffer file descriptor */
+};
+
+static inline int siridb_buffer_fsync(siridb_buffer_t * buffer)
+{
+ return (buffer->fp == NULL) ? 0 : fsync(buffer->fd);
+}
#endif /* SIRIDB_BUFFER_H_ */
#include <siri/db/groups.h>
#include <siri/db/tasks.h>
#include <siri/db/time.h>
+#include <siri/db/buffer.h>
int32_t siridb_get_uptime(siridb_t * siridb);
int8_t siridb_get_idle_percentage(siridb_t * siridb);
uint32_t list_limit;
uuid_t uuid;
iso8601_tz_t tz;
- size_t buffer_size;
- size_t buffer_len;
- char * buffer_clear;
struct timespec start_time; /* to calculate up-time. */
uint64_t duration_num; /* number duration in s, ms, us or ns */
uint64_t duration_log; /* log duration in s, ms, us or ns */
char * dbname;
char * dbpath;
- char * buffer_path;
double drop_threshold;
size_t received_points;
size_t selected_points;
- slist_t * empty_buffers;
+
siridb_time_t * time;
siridb_server_t * server;
siridb_server_t * replica;
uv_mutex_t series_mutex;
uv_mutex_t shards_mutex;
imap_t * shards;
- FILE * buffer_fp;
FILE * dropped_fp;
qp_fpacker_t * store;
siridb_fifo_t * fifo;
siridb_replicate_t * replicate;
siridb_reindex_t * reindex;
siridb_groups_t * groups;
+ siridb_buffer_t * buffer;
siridb_tasks_t tasks;
};
#endif
#ifdef __APPLE__
-long int procinfo_open_files(const char * path)
+long int procinfo_open_files(const char * path, int include_fd)
{
pid_t pid = getpid();
size_t len = strlen(path);
if ( res == PROC_PIDFDVNODEPATHINFO_SIZE &&
strncmp(path, vnode_info.pvip.vip_path, len) == 0)
{
+ vnode_info
count++;
}
+ else if (
+ res == PROC_PIDFDVNODEPATHINFO_SIZE &&
+ include_fd >= 0 &&
+ include_fd == fd_info[i].proc_fd)
+ {
+ include_fd = -1;
+ count++;
+ };
+
}
}
free(fd_info);
return count;
}
#else
-long int procinfo_open_files(const char * path)
+long int procinfo_open_files(const char * path, int include_fd)
{
long int count = 0;
DIR * dirp;
if (entry->d_type == DT_REG || entry->d_type == DT_LNK)
{
snprintf(buffer, XPATH_MAX, "/proc/self/fd/%s", entry->d_name);
-
if (realpath(buffer, buf) == NULL)
{
continue;
{
count++;
}
+ else if (
+ include_fd >= 0 &&
+ include_fd == strtol(entry->d_name, NULL, 10))
+ {
+ include_fd = -1;
+ count++;
+ };
}
}
closedir(dirp);
siridb_fifo_close(siridb->fifo);
}
- if (siridb->buffer_fp != NULL)
+ if (siridb->buffer->fp != NULL)
{
- if (fclose(siridb->buffer_fp) == 0)
+ if (fclose(siridb->buffer->fp) == 0)
{
- siridb->buffer_fp = NULL;
+ siridb->buffer->fp = NULL;
}
else
{
siridb = (siridb_t *) siridb_node->data;
/* flush the buffer, maybe on each insert or another interval? */
- if (siridb_buffer_fsync(siridb))
+ if (siridb_buffer_fsync(siridb->buffer))
{
log_critical("fsync() has failed on the buffer file");
}
/* when set to 1, no caching is done. 1 is the minimum value. */
#define SIRIDB_BUFFER_CACHE 64
-static int buffer__create_new(siridb_t * siridb, siridb_series_t * series);
-static int buffer__use_empty(siridb_t * siridb, siridb_series_t * series);
+static int buffer__create_new(
+ siridb_buffer_t * buffer,
+ siridb_series_t * series);
+static int buffer__use_empty(
+ siridb_buffer_t * buffer,
+ siridb_series_t * series);
static void buffer__migrate_to_new(char * pt, size_t sz);
/* buffer__start cannot conflict with a series_id since id 0 is never used */
static const uint64_t buffer__end = 0xffffffffffffffff;
+siridb_buffer_t * siridb_buffer_new(void)
+{
+ siridb_buffer_t * buffer = malloc(sizeof(siridb_buffer_t));
+ if (buffer == NULL)
+ {
+ return NULL;
+ }
+ buffer->empty = slist_new(SLIST_DEFAULT_SIZE);
+ if (buffer->empty == NULL)
+ {
+ free(buffer);
+ return NULL;
+ }
+ buffer->fd = 0;
+ buffer->fp = NULL;
+ buffer->len = 0;
+ buffer->nsize = 0;
+ buffer->path = NULL;
+ buffer->size = 0;
+ buffer->template = NULL;
+
+ return buffer;
+}
+
+void siridb_buffer_free(siridb_buffer_t * buffer)
+{
+ if (buffer->fp != NULL)
+ {
+ fclose(buffer->fp);
+ }
+ free(buffer->template);
+ free(buffer->path);
+ slist_free(buffer->empty);
+ free(buffer);
+}
+
/*
* Returns 0 if success or EOF in case of an error.
*/
int siridb_buffer_write_empty(
- siridb_t * siridb,
+ siridb_buffer_t * buffer,
siridb_series_t * series)
{
- memcpy(siridb->buffer_clear + 4, &series->id, sizeof(uint32_t));
+ memcpy(buffer->template + 4, &series->id, sizeof(uint32_t));
return (
/* go to the series position in buffer */
- fseeko( siridb->buffer_fp,
+ fseeko( buffer->fp,
series->bf_offset,
SEEK_SET) ||
/* write end ts */
- fwrite( siridb->buffer_clear,
- siridb->buffer_size,
+ fwrite( buffer->template,
+ buffer->size,
1,
- siridb->buffer_fp) != 1) ? EOF : 0;
+ buffer->fp) != 1) ? EOF : 0;
}
/*
* Returns 0 if success or EOF in case of an error.
*/
int siridb_buffer_write_last_point(
- siridb_t * siridb,
+ siridb_buffer_t * buffer,
siridb_series_t * series)
{
siridb_point_t * point;
return (
/* jump to position where to write the new point */
- fseeko( siridb->buffer_fp,
+ fseeko( buffer->fp,
series->bf_offset + 8 + (16 * last_idx),
SEEK_SET) ||
/* write time-stamp and value */
- fwrite(buf, sz, 1, siridb->buffer_fp) != 1) ? EOF : 0;
+ fwrite(buf, sz, 1, buffer->fp) != 1) ? EOF : 0;
}
/*
* Returns 0 if successful; -1 and a SIGNAL is raised in case an error occurred.
*/
-int siridb_buffer_new_series(siridb_t * siridb, siridb_series_t * series)
+int siridb_buffer_new_series(
+ siridb_buffer_t * buffer,
+ siridb_series_t * series)
{
/* allocate new buffer */
- series->buffer = siridb_points_new(siridb->buffer_len, series->tp);
+ series->buffer = siridb_points_new(buffer->len, series->tp);
if (series->buffer == NULL)
{
return -1; /* signal is raised */
}
- return (siridb->empty_buffers->len) ?
- buffer__use_empty(siridb, series) :
- buffer__create_new(siridb, series);
-}
-
-int siridb_buffer_fsync(siridb_t * siridb)
-{
- if (siridb->buffer_fp == NULL)
- {
- return 0;
- }
- int buffer_fd = fileno(siridb->buffer_fp);
- return (buffer_fd != -1) ? fsync(buffer_fd) : -1;
+ return (buffer->empty->len) ?
+ buffer__use_empty(buffer, series) :
+ buffer__create_new(buffer, series);
}
/*
* Returns 0 if successful or -1 in case of an error.
*/
-int siridb_buffer_open(siridb_t * siridb)
+int siridb_buffer_open(siridb_buffer_t * buffer)
{
- int buffer_fd, rc;
- siridb_misc_get_fn(fn, siridb->buffer_path, SIRIDB_BUFFER_FN)
+ const int flags = POSIX_FADV_RANDOM | POSIX_FADV_DONTNEED;
+ int rc;
+ siridb_misc_get_fn(fn, buffer->path, SIRIDB_BUFFER_FN)
- if ((siridb->buffer_fp = fopen(fn, "r+")) == NULL)
+ if ((buffer->fp = fopen(fn, "r+")) == NULL)
{
log_critical("Cannot open '%s' for reading and writing", fn);
return -1;
}
- buffer_fd = fileno(siridb->buffer_fp);
+ buffer->fd = fileno(buffer->fp);
- if (buffer_fd == -1)
+ if (buffer->fd == -1)
{
log_critical("Cannot get file descriptor: '%s'", fn);
+ fclose(buffer->fp);
+ buffer->fp = NULL;
return -1;
}
#ifdef __APPLE__
rc = 0; /* no posix_fadvise on apple */
#else
- rc = posix_fadvise(buffer_fd, 0, 0, POSIX_FADV_RANDOM|POSIX_FADV_DONTNEED);
+ rc = posix_fadvise(buffer->fd, 0, 0, flags);
if (rc)
{
log_warning("Cannot set advice for file access: '%s' (%d)", fn, rc);
*/
int siridb_buffer_load(siridb_t * siridb)
{
+ siridb_buffer_t * buffer = siridb->buffer;
FILE * fp;
FILE * fp_temp;
size_t read_at_once = 8;
size_t num, i;
- char buffer[siridb->buffer_size * read_at_once];
+ char buf[buffer->size * read_at_once];
char * pt, * end;
long int offset = 0;
siridb_series_t * series;
log_info("Loading and cleanup buffer");
- siridb->buffer_clear = malloc(siridb->buffer_size);
- if (siridb->buffer_clear == NULL)
+ buffer->template = malloc(buffer->size);
+ if (buffer->template == NULL)
{
log_critical("Allocation error while loading buffer");
return -1;
}
- for ( pt = siridb->buffer_clear,
- end = siridb->buffer_clear + siridb->buffer_size;
+ for ( pt = buffer->template,
+ end = buffer->template + buffer->size;
pt < end;
pt += sizeof(uint64_t))
{
memcpy(pt, &buffer__end, sizeof(uint64_t));
}
- memcpy(siridb->buffer_clear, &buffer__start, sizeof(uint32_t));
+ memcpy(buffer->template, &buffer__start, sizeof(uint32_t));
- siridb_misc_get_fn(fn, siridb->buffer_path, SIRIDB_BUFFER_FN)
- siridb_misc_get_fn(fn_temp, siridb->buffer_path, "__" SIRIDB_BUFFER_FN)
+ siridb_misc_get_fn(fn, buffer->path, SIRIDB_BUFFER_FN)
+ siridb_misc_get_fn(fn_temp, buffer->path, "__" SIRIDB_BUFFER_FN)
if (xpath_file_exist(fn_temp))
{
return -1;
}
- while ((num = fread(buffer, siridb->buffer_size, read_at_once, fp)))
+ while ((num = fread(buf, buffer->size, read_at_once, fp)))
{
for (i = 0; i < num; i++)
{
- pt = buffer + i * siridb->buffer_size;
+ pt = buf + i * buffer->size;
buf_start = *((uint32_t *) pt);
if (buf_start != buffer__start)
log_warning("Buffer will be migrated");
log_migrate = 0;
}
- buffer__migrate_to_new(pt, siridb->buffer_size);
+ buffer__migrate_to_new(pt, buffer->size);
}
pt += sizeof(uint32_t);
continue;
}
- series->buffer = siridb_points_new(siridb->buffer_len, series->tp);
+ series->buffer = siridb_points_new(buffer->len, series->tp);
if (series->buffer == NULL)
{
log_critical("Cannot allocate a buffer for series id %u",
siridb_points_add_point(series->buffer, ts, val);
}
- offset += siridb->buffer_size;
+ offset += buffer->size;
/* increment series->length which is 0 at this time */
series->length += series->buffer->len;
/* write to output file and check if write was successful */
- if ((fwrite(buffer + i * siridb->buffer_size,
- siridb->buffer_size, 1, fp_temp) != 1))
+ if ((fwrite(buf + i*buffer->size, buffer->size, 1, fp_temp) != 1))
{
log_critical("Could not write to temporary buffer file: '%s'",
fn_temp);
return 0;
}
-void siridb_buffer_free(siridb_t * siridb)
-{
- if (siridb->buffer_fp != NULL)
- {
- fclose(siridb->buffer_fp);
- siridb->buffer_fp = NULL;
- }
- free(siridb->buffer_clear);
- siridb->buffer_clear = NULL;
-}
-
/*
* Reserve a space in the buffer for a new series. The position of this space
* in the buffer is read from siridb->empty_buffers so this list must have
* Note that an available spot must be checked before calling this function.
* This functions has undefined behavior if no spot is found.
*/
-static int buffer__use_empty(siridb_t * siridb, siridb_series_t * series)
+static int buffer__use_empty(
+ siridb_buffer_t * buffer,
+ siridb_series_t * series)
{
- series->bf_offset = (long int) slist_pop(siridb->empty_buffers);
+ series->bf_offset = (long int) slist_pop(buffer->empty);
- if (siridb_buffer_write_empty(siridb, series))
+ if (siridb_buffer_write_empty(buffer, series))
{
ERR_FILE
return -1;
*
* Returns 0 if successful or -1 and a signal is raised in case of an error.
*/
-static int buffer__create_new(siridb_t * siridb, siridb_series_t * series)
+static int buffer__create_new(
+ siridb_buffer_t * buffer,
+ siridb_series_t * series)
{
long int buffer_pos;
- /* get file descriptor */
- int buffer_fd = fileno(siridb->buffer_fp);
- if (buffer_fd == -1)
- {
- ERR_FILE
- return -1;
- }
/* jump to end of buffer */
- if (fseeko(siridb->buffer_fp, 0, SEEK_END))
+ if (fseeko(buffer->fp, 0, SEEK_END))
{
ERR_FILE
return -1;
}
/* bind the current offset to the new series */
- if ((series->bf_offset = ftello(siridb->buffer_fp)) == -1)
+ if ((series->bf_offset = ftello(buffer->fp)) == -1)
{
ERR_FILE
return -1;
}
/* write buffer start and series ID to buffer */
- if (siridb_buffer_write_empty(siridb, series))
+ if (siridb_buffer_write_empty(buffer, series))
{
ERR_FILE
return -1;
}
- buffer_pos = series->bf_offset + siridb->buffer_size * SIRIDB_BUFFER_CACHE;
+ buffer_pos = series->bf_offset + buffer->size * SIRIDB_BUFFER_CACHE;
/* fill buffer with zeros if possible */
- if (ftruncate(buffer_fd, buffer_pos))
+ if (ftruncate(buffer->fd, buffer_pos))
{
ERR_FILE
return -1;
}
/* commit changes to disk */
- if (fsync(buffer_fd))
+ if (fsync(buffer->fd))
{
ERR_FILE
return -1;
}
- while ((buffer_pos -= siridb->buffer_size) > series->bf_offset)
+ while ((buffer_pos -= buffer->size) > series->bf_offset)
{
- slist_append_safe(&siridb->empty_buffers, (void *) buffer_pos);
+ slist_append_safe(&buffer->empty, (void *) buffer_pos);
}
return 0;
*
*/
-static siridb_t * SIRIDB_new(void);
-
-static int SIRIDB_from_unpacker(
+static siridb_t * siridb__new(void);
+static int siridb__from_unpacker(
qp_unpacker_t * unpacker,
siridb_t ** siridb,
const char * dbpath,
char * err_msg);
+static siridb_t * siridb__from_dat(const char * dbpath);
+static int siridb__read_conf(siridb_t * siridb);
+static int siridb__lock(const char * dbpath, int lock_flags);
#define READ_DB_EXIT_WITH_ERROR(ERROR_MSG) \
strcpy(err_msg, ERROR_MSG); \
return (idle > 100) ? 100 : idle;
}
-
/*
* Check if at least database.conf and database.dat exist in the path.
*/
siridb_t * siridb_new(const char * dbpath, int lock_flags)
{
size_t len = strlen(dbpath);
- lock_t lock_rc;
- char buffer[XPATH_MAX];
- cfgparser_t * cfgparser;
- cfgparser_option_t * option = NULL;
- qp_unpacker_t * unpacker;
siridb_t * siridb;
- char err_msg[512];
- int rc;
size_t i;
if (!len || dbpath[len - 1] != '/')
return NULL;
}
- lock_rc = lock_lock(dbpath, lock_flags);
-
- switch (lock_rc)
- {
- case LOCK_IS_LOCKED_ERR:
- case LOCK_PROCESS_NAME_ERR:
- case LOCK_WRITE_ERR:
- case LOCK_READ_ERR:
- case LOCK_MEM_ALLOC_ERR:
- log_error("%s (%s)", lock_str(lock_rc), dbpath);
- return NULL;
- case LOCK_NEW:
- log_info("%s (%s)", lock_str(lock_rc), dbpath);
- break;
- case LOCK_OVERWRITE:
- log_warning("%s (%s)", lock_str(lock_rc), dbpath);
- break;
- default:
- assert (0);
- break;
- }
-
- /* read database.conf */
- snprintf(buffer,
- XPATH_MAX,
- "%sdatabase.conf",
- dbpath);
-
- cfgparser = cfgparser_new();
- if (cfgparser == NULL)
- {
- return NULL; /* signal is raised */
- }
- if ((rc = cfgparser_read(cfgparser, buffer)) != CFGPARSER_SUCCESS)
- {
- log_error("Could not read '%s': %s",
- buffer,
- cfgparser_errmsg(rc));
- cfgparser_free(cfgparser);
- return NULL;
- }
-
- snprintf(buffer,
- XPATH_MAX,
- "%sdatabase.dat",
- dbpath);
-
- if ((unpacker = qp_unpacker_ff(buffer)) == NULL)
- {
- /* qp_unpacker has done some logging */
- cfgparser_free(cfgparser);
- return NULL;
- }
-
- if ((rc = SIRIDB_from_unpacker(
- unpacker,
- &siridb,
- dbpath,
- err_msg)) < 0)
+ if (siridb__lock(dbpath, lock_flags))
{
- log_error("Could not read '%s': %s", buffer, err_msg);
- qp_unpacker_ff_free(unpacker);
- cfgparser_free(cfgparser);
+ log_error("Cannot lock database path '%s'", dbpath);
return NULL;
}
- qp_unpacker_ff_free(unpacker);
-
- if (rc > 0 && siridb_save(siridb))
+ siridb = siridb__from_dat(dbpath);
+ if (siridb == NULL)
{
- log_error("Could not write file: %s", buffer);
- cfgparser_free(cfgparser);
- siridb_decref(siridb);
+ log_error("Cannot load SiriDB from database path '%s'", dbpath);
return NULL;
}
log_info("Start loading database: '%s'", siridb->dbname);
- /* read buffer_path from database.conf */
- rc = cfgparser_get_option(
- &option,
- cfgparser,
- "buffer",
- "path");
-
- if (rc == CFGPARSER_SUCCESS && option->tp == CFGPARSER_TP_STRING)
- {
- len = strlen(option->val->string);
- siridb->buffer_path = NULL;
- if (option->val->string[len - 1] == '/')
- {
- siridb->buffer_path = strdup(option->val->string);
- }
- else if (asprintf(
- &siridb->buffer_path,
- "%s/",
- option->val->string) < 0)
- {
- siridb->buffer_path = NULL;
- }
- }
- else
- {
- siridb->buffer_path = siridb->dbpath;
- }
-
- /* free cfgparser */
- cfgparser_free(cfgparser);
-
- if (siridb->buffer_path == NULL)
+ /* read database.conf */
+ if (siridb__read_conf(siridb))
{
- ERR_ALLOC
+ log_error("Could not read config for database '%s'", siridb->dbname);
siridb_decref(siridb);
return NULL;
}
return NULL;
}
- /* load buffer */
- if (siridb_buffer_load(siridb))
+ /* load shards */
+ if (siridb_shards_load(siridb))
{
- log_error("Could not read buffer for database '%s'", siridb->dbname);
+ log_error("Could not read shards for database '%s'", siridb->dbname);
siridb_decref(siridb);
return NULL;
}
- /* open buffer */
- if (siridb_buffer_open(siridb))
+ /* load buffer */
+ if (siridb_buffer_load(siridb))
{
- log_error("Could not open buffer for database '%s'", siridb->dbname);
+ log_error("Could not read buffer for database '%s'", siridb->dbname);
siridb_decref(siridb);
return NULL;
}
- /* load shards */
- if (siridb_shards_load(siridb))
+ /* open buffer */
+ if (siridb_buffer_open(siridb->buffer))
{
- log_error("Could not read shards for database '%s'", siridb->dbname);
+ log_error("Could not open buffer for database '%s'", siridb->dbname);
siridb_decref(siridb);
return NULL;
}
*
* (a SIGNAL can be set in case of an error)
*/
-static int SIRIDB_from_unpacker(
+static int siridb__from_unpacker(
qp_unpacker_t * unpacker,
siridb_t ** siridb,
const char * dbpath,
}
/* create a new SiriDB structure */
- *siridb = SIRIDB_new();
+ *siridb = siridb__new();
if (*siridb == NULL)
{
sprintf(err_msg, "Cannot create SiriDB instance.");
}
/* bind buffer size and len to SiriDB */
- (*siridb)->buffer_size = (size_t) qp_obj.via.int64;
- (*siridb)->buffer_len = (*siridb)->buffer_size / sizeof(siridb_point_t);
+ (*siridb)->buffer->size = (size_t) qp_obj.via.int64;
+ (*siridb)->buffer->len = (*siridb)->buffer->size / sizeof(siridb_point_t);
/* read number duration */
if (qp_next(unpacker, &qp_obj) != QP_INT64)
*/
int siridb_open_files(siridb_t * siridb)
{
- int open_files = procinfo_open_files(siridb->dbpath);
-
- if ( siridb->buffer_path != siridb->dbpath &&
- strncmp(
- siridb->dbpath,
- siridb->buffer_path,
- strlen(siridb->dbpath)))
- {
- open_files += procinfo_open_files(siridb->buffer_path);
- }
-
- return open_files;
+ siridb_buffer_t * buffer = siridb->buffer;
+ return procinfo_open_files(
+ siridb->dbpath,
+ (buffer->fp == NULL) ? -1 : buffer->fd);
}
/*
qp_fadd_raw(fpacker, (const unsigned char *) siridb->uuid, 16) ||
qp_fadd_string(fpacker, siridb->dbname) ||
qp_fadd_int8(fpacker, siridb->time->precision) ||
- qp_fadd_int64(fpacker, siridb->buffer_size) ||
+ qp_fadd_int64(fpacker, siridb->buffer->size) ||
qp_fadd_int64(fpacker, siridb->duration_num) ||
qp_fadd_int64(fpacker, siridb->duration_log) ||
qp_fadd_string(fpacker, iso8601_tzname(siridb->tz)) ||
#endif
/* first we should close the buffer and all other open files */
- siridb_buffer_free(siridb);
+ if (siridb->buffer != NULL)
+ {
+ siridb_buffer_free(siridb->buffer);
+ }
if (siridb->dropped_fp != NULL)
{
siridb_users_free(siridb->users);
}
- /* free buffer positions */
- slist_free(siridb->empty_buffers);
-
/* we do not need to free server and replica since they exist in
* this list and therefore will be freed.
*/
imap_free(siridb->shards, (imap_free_cb) &siridb__shard_decref);
}
- /* only free buffer path when not equal to db_path */
- if (siridb->buffer_path != siridb->dbpath)
- {
- free(siridb->buffer_path);
- }
-
if (siridb->groups != NULL)
{
siridb_groups_decref(siridb->groups);
/*
* Returns NULL and raises a SIGNAL in case an error has occurred.
*/
-static siridb_t * SIRIDB_new(void)
+static siridb_t * siridb__new(void)
{
siridb_t * siridb = (siridb_t *) malloc(sizeof(siridb_t));
if (siridb == NULL)
}
else
{
- /* allocate a list for buffer positions */
- siridb->empty_buffers = slist_new(SLIST_DEFAULT_SIZE);
- if (siridb->empty_buffers == NULL)
+ /* allocate a buffer */
+ siridb->buffer = siridb_buffer_new();
+ if (siridb->buffer == NULL)
{
imap_free(siridb->shards, NULL);
imap_free(siridb->series_map, NULL);
siridb->ref = 1;
siridb->insert_tasks = 0;
siridb->flags = 0;
- siridb->buffer_path = NULL;
siridb->time = NULL;
siridb->users = NULL;
siridb->servers = NULL;
siridb->drop_threshold = DEF_DROP_THRESHOLD;
siridb->select_points_limit = DEF_SELECT_POINTS_LIMIT;
siridb->list_limit = DEF_LIST_LIMIT;
- siridb->buffer_size = -1;
siridb->tz = -1;
siridb->server = NULL;
siridb->replica = NULL;
siridb->groups = NULL;
/* make file pointers are NULL when file is closed */
- siridb->buffer_fp = NULL;
siridb->dropped_fp = NULL;
siridb->store = NULL;
return siridb;
}
+static siridb_t * siridb__from_dat(const char * dbpath)
+{
+ int rc;
+ siridb_t * siridb = NULL;
+ char err_msg[512];
+ qp_unpacker_t * unpacker;
+ char buffer[XPATH_MAX];
+
+ snprintf(buffer,
+ XPATH_MAX,
+ "%sdatabase.dat",
+ dbpath);
+
+ unpacker = qp_unpacker_ff(buffer);
+ if (unpacker == NULL)
+ {
+ return NULL;
+ }
+
+ if ((rc = siridb__from_unpacker(
+ unpacker,
+ &siridb,
+ dbpath,
+ err_msg)) < 0)
+ {
+ log_error("Could not read '%s': %s", buffer, err_msg);
+ qp_unpacker_ff_free(unpacker);
+ return NULL;
+ }
+
+ qp_unpacker_ff_free(unpacker);
+
+ if (rc > 0 && siridb_save(siridb))
+ {
+ log_error("Could not write file: %s", buffer);
+ siridb_decref(siridb);
+ return NULL;
+ }
+
+ return siridb;
+}
+
+static int siridb__read_conf(siridb_t * siridb)
+{
+ int rc;
+ char buf[XPATH_MAX];
+ cfgparser_t * cfgparser;
+ cfgparser_option_t * option = NULL;
+ siridb_buffer_t * buffer = siridb->buffer;
+ snprintf(buf,
+ XPATH_MAX,
+ "%sdatabase.conf",
+ siridb->dbpath);
+
+ cfgparser = cfgparser_new();
+ if (cfgparser == NULL)
+ {
+ return -1; /* signal is raised */
+ }
+
+ rc = cfgparser_read(cfgparser, buf);
+
+ if (rc != CFGPARSER_SUCCESS)
+ {
+ log_error("Could not read '%s': %s", buf, cfgparser_errmsg(rc));
+ cfgparser_free(cfgparser);
+ return -1;
+ }
+
+ /* read buffer_path from database.conf */
+ rc = cfgparser_get_option(&option, cfgparser, "buffer", "path");
+ if (rc == CFGPARSER_SUCCESS && option->tp == CFGPARSER_TP_STRING)
+ {
+ size_t len = strlen(option->val->string);
+ buffer->path = NULL;
+ if (option->val->string[len - 1] == '/')
+ {
+ buffer->path = strdup(option->val->string);
+ }
+ else if (
+ len >= 11 &&
+ strcmp(option->val->string + (len-11), "/buffer.dat") == 0)
+ {
+ buffer->path = strndup(option->val->string, len-10);
+ }
+ else if (asprintf(&buffer->path, "%s/", option->val->string) < 0)
+ {
+ buffer->path = NULL;
+ }
+ }
+ else
+ {
+ buffer->path = strdup(siridb->dbpath);
+ }
+
+ rc = cfgparser_get_option(&option, cfgparser, "buffer", "size");
+ if (rc == CFGPARSER_SUCCESS && option->tp == CFGPARSER_TP_INTEGER)
+ {
+
+ }
+
+ cfgparser_free(cfgparser);
+
+ return (buffer->path == NULL) ? -1 : 0;
+}
+
+static int siridb__lock(const char * dbpath, int lock_flags)
+{
+ lock_t lock_rc = lock_lock(dbpath, lock_flags);
+
+ switch (lock_rc)
+ {
+ case LOCK_IS_LOCKED_ERR:
+ case LOCK_PROCESS_NAME_ERR:
+ case LOCK_WRITE_ERR:
+ case LOCK_READ_ERR:
+ case LOCK_MEM_ALLOC_ERR:
+ log_error("%s (%s)", lock_str(lock_rc), dbpath);
+ return -1;
+ case LOCK_NEW:
+ log_info("%s (%s)", lock_str(lock_rc), dbpath);
+ break;
+ case LOCK_OVERWRITE:
+ log_warning("%s (%s)", lock_str(lock_rc), dbpath);
+ break;
+ default:
+ assert (0);
+ break;
+ }
+ return 0;
+}
siridb = ilocal->siridb;
- if (siridb->buffer_fp == NULL && siridb_buffer_open(siridb))
+ if (siridb->buffer->fp == NULL && siridb_buffer_open(siridb->buffer))
{
ERR_FILE
ilocal->status = INSERT_LOCAL_ERROR;
if (siri.buffersync == NULL)
{
- if (siridb_buffer_fsync(siridb))
+ if (siridb_buffer_fsync(siridb->buffer))
{
log_critical("fsync() has failed on the buffer file");
}
int map)
{
SIRIDB_PROP_MAP("buffer_path", 11)
- qp_add_string(packer, siridb->buffer_path);
+ qp_add_string(packer, siridb->buffer->path);
}
static void prop_buffer_size(
int map)
{
SIRIDB_PROP_MAP("buffer_size", 11)
- qp_add_int32(packer, (int32_t) siridb->buffer_size);
+ qp_add_int32(packer, (int32_t) siridb->buffer->size);
}
static void prop_dbname(
*/
siridb_points_add_point(series->buffer, ts, val);
- if (series->buffer->len == siridb->buffer_len)
+ if (series->buffer->len == siridb->buffer->len)
{
if (siridb_shards_add_points(
siridb,
else
{
series->buffer->len = 0;
- if (siridb_buffer_write_empty(siridb, series))
+ if (siridb_buffer_write_empty(siridb->buffer, series))
{
ERR_FILE
rc = -1;
}
else
{
- if (siridb_buffer_write_last_point(siridb, series))
+ if (siridb_buffer_write_last_point(siridb->buffer, series))
{
ERR_FILE
log_critical("Cannot write new point to buffer");
siridb_series_t *__restrict series,
siridb_pcache_t *__restrict pcache)
{
- if (pcache->len > siridb->buffer_len || series->buffer == NULL)
+ if (pcache->len > siridb->buffer->len || series->buffer == NULL)
{
series->length += pcache->len;
(siridb_points_t *) pcache);
}
- if (pcache->len + series->buffer->len > siridb->buffer_len)
+ if (pcache->len + series->buffer->len > siridb->buffer->len)
{
series->length += pcache->len;
}
series->buffer->len = 0;
- if (siridb_buffer_write_empty(siridb, series))
+ if (siridb_buffer_write_empty(siridb->buffer, series))
{
ERR_FILE
return -1;
}
/* create a buffer for series (except string series) */
- if (tp != TP_STRING && siridb_buffer_new_series(siridb, series))
+ if (tp != TP_STRING && siridb_buffer_new_series(siridb->buffer, series))
{
/* signal is raised */
log_critical("Could not create buffer for series '%s'.",
if (series->flags & SIRIDB_SERIES_IS_DROPPED)
{
slist_append_safe(
- &series->siridb->empty_buffers,
+ &series->siridb->buffer->empty,
(void *) series->bf_offset);
}
}
qp_add_int8(packer, siri.cfg->ip_support) ||
qp_add_string_term(packer, uv_version_string()) ||
qp_add_string_term(packer, siridb->dbpath) ||
- qp_add_string_term(packer, siridb->buffer_path) ||
- qp_add_int64(packer, (int64_t) siridb->buffer_size) ||
+ qp_add_string_term(packer, siridb->buffer->path) ||
+ qp_add_int64(packer, (int64_t) siridb->buffer->size) ||
qp_add_int32(packer, (int32_t) siri.startup_time) ||
qp_add_string_term(packer, siridb->server->address) ||
qp_add_int32(packer, (int32_t) siridb->server->port))
return cexpr_str_cmp(
cond->operator,
(wserver->siridb->server == wserver->server) ?
- wserver->siridb->buffer_path :
+ wserver->siridb->buffer->path :
(wserver->server->buffer_path != NULL) ?
wserver->server->buffer_path : "",
cond->str);
return cexpr_int_cmp(
cond->operator,
(wserver->siridb->server == wserver->server) ?
- wserver->siridb->buffer_size :
+ wserver->siridb->buffer->size :
wserver->server->buffer_size,
cond->int64);
qp_add_string(
query->packer,
(siridb->server == server) ?
- siridb->buffer_path :
+ siridb->buffer->path :
(server->buffer_path != NULL) ?
server->buffer_path : "");
break;
qp_add_int64(
query->packer,
(siridb->server == server) ?
- siridb->buffer_size : server->buffer_size);
+ siridb->buffer->size : server->buffer_size);
break;
case CLERI_GID_K_DBPATH:
qp_add_string(